Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

[SPARK-10548] [SPARK-10563] [SQL] Fix concurrent SQL executions #8710

Closed
wants to merge 11 commits into from

Conversation

andrewor14
Copy link
Contributor

Note: this is for master branch only. The fix for branch-1.5 is at #8721.

The query execution ID is currently passed from a thread to its children, which is not the intended behavior. This led to IllegalArgumentException: spark.sql.execution.id is already set when running queries in parallel, e.g.:

(1 to 100).par.foreach { _ =>
  sc.parallelize(1 to 5).map { i => (i, i) }.toDF("a", "b").count()
}

The cause is SparkContext's local properties are inherited by default. This patch adds a way to exclude keys we don't want to be inherited, and makes SQL go through that code path.

Andrew Or added 2 commits September 10, 2015 16:37
such as, cough cough, the SQL execution ID. This was a problem
because scala's parallel collections spawns threads as children
of the existing threads, causing the execution ID to be inherited
when it shouldn't be.
Because java.util.Properties' remove method takes in an Any
instead of a String, there were some issues with matching the
key's hashCode, so removing was not successful in unit tests.

Instead, this commit fixes it by manually filtering out the keys
and adding them to the child thread's properties.
@andrewor14
Copy link
Contributor Author

@davies @zsxwing FYI

@SparkQA
Copy link

SparkQA commented Sep 11, 2015

Test build #42307 has finished for PR 8710 at commit 3ec715c.

  • This patch fails Scala style tests.
  • This patch merges cleanly.
  • This patch adds no public classes.

// We need to run this multiple times to ensure new threads are spawned. Without the fix
// for SPARK-10548, this usually fails on the second try.
val df = sparkContext.parallelize(1 to 5).map { i => (i, i) }.toDF("a", "b")
(1 to 10).par.foreach { _ => df.count() }
Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

I thought DataFrame is not thread-safe and should not be used like this.

Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

What makes you think that? SparkContext/SQLContext/DataFrames should be threadsafe.

Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Ah, I think we have fixed that, and if not I would also consider that a bug :)

@zsxwing
Copy link
Member

zsxwing commented Sep 11, 2015

The cause is SparkContext's local properties are inherited by default.

I just realized this is not the cause. There is no new thread running queries between sc.setLocalProperty(EXECUTION_ID_KEY, executionId.toString) and sc.setLocalProperty(EXECUTION_ID_KEY, null) in your example. Actually, I found sc.setLocalProperty(EXECUTION_ID_KEY, null) sometimes cannot clear EXECUTION_ID_KEY. I will investigate it.

@zsxwing
Copy link
Member

zsxwing commented Sep 11, 2015

Ah, I see. So the issue is:

  1. Thread A creates a new Thread B
  2. Thread A starts to run a query (set the execution id property)
  3. Thread A is running a query
  4. Thread B sees the execution id in Thread A's properties set by step 2, then it will throw an exception.

@@ -348,10 +348,27 @@ class SparkContext(config: SparkConf) extends Logging with ExecutorAllocationCli

// Thread Local variable that can be used by users to pass information down the stack
private val localProperties = new InheritableThreadLocal[Properties] {
override protected def childValue(parent: Properties): Properties = new Properties(parent)
Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

@andrewor14 I'm thinking maybe we should not use new Properties(parent) here. Instead, always copy the parent's Properties to the child's Properties. Do you think if the child thread needs to see the further changes to the parent thread's Properties after creating?

This is really confusing when using Executor like ForkJoinPool (scala.concurrent.ExecutionContext.Implicits.global), in which thread A creates thread B but thread B is not a child of thread A. But thread B still can see the changes in thread A.

/cc @jerryshao since you added this line.

Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Hi @zsxwing , reasons to use InheritableThreadLocal can be seen here (mesos/spark#937). mainly it is used for Spark Streaming with FIFO scheduling strategy.

Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

I see. However, new Properties(parent) keeps a reference to parent rather than copying them. So If we make any change to the parent thread's properties after creating the child thread, the child thread will see them.

Is it necessary that the child thread keeps track of the further updates of the parent thread's properties? I think copying them would be more reasonable.

I didn't mean removing this line. I mean changing it to

val child = new Properties()
child.putAll(parent)
child

Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Yes I agree with you that copying is more reasonable, for now I cannot image any scenario which requires to keep track of parent's properties, I think it is OK for me to change it, we can always fix this if there's any special scenario.

Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

@jerryshao Thanks :)

@andrewor14 how about just copying the parent properties rather than adding nonInheritedLocalProperties? It looks simpler.

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

I agree, I was actually going to do it in a separate patch. Incidentally @tdas @JoshRosen and I just talked about this last night and we all agreed to make it do a clone instead so the semantics are simpler.

However, my one concern is that doing so will change semantics for non-SQL users in 1.5.1, so my proposal is the following: I will make the changes in this patch and merge this patch ONLY into master. Then I'll create a new patch for branch 1.5 that will have the current changes (the ones where we don't clone except for SQL). I think that's the safest way forward.

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

OK, I have updated this in the latest commit, and filed SPARK-10563 for this issue.

@SparkQA
Copy link

SparkQA commented Sep 11, 2015

Test build #42308 has finished for PR 8710 at commit d48c114.

  • This patch fails Spark unit tests.
  • This patch merges cleanly.
  • This patch adds the following public classes (experimental):
    • case class ExecutorLostFailure(execId: String, isNormalExit: Boolean = false)
    • class ExecutorLossReason(val message: String) extends Serializable
    • case class ExecutorExited(exitCode: Int, isNormalExit: Boolean, reason: String)
    • case class RemoveExecutor(executorId: String, reason: ExecutorLossReason)
    • case class GetExecutorLossReason(executorId: String) extends CoarseGrainedClusterMessage
    • case class ConvertToSafeNode(conf: SQLConf, child: LocalNode) extends UnaryLocalNode(conf)
    • case class ConvertToUnsafeNode(conf: SQLConf, child: LocalNode) extends UnaryLocalNode(conf)
    • case class FilterNode(conf: SQLConf, condition: Expression, child: LocalNode)
    • case class HashJoinNode(
    • case class LimitNode(conf: SQLConf, limit: Int, child: LocalNode) extends UnaryLocalNode(conf)
    • abstract class LocalNode(conf: SQLConf) extends TreeNode[LocalNode] with Logging
    • abstract class LeafLocalNode(conf: SQLConf) extends LocalNode(conf)
    • abstract class UnaryLocalNode(conf: SQLConf) extends LocalNode(conf)
    • abstract class BinaryLocalNode(conf: SQLConf) extends LocalNode(conf)
    • case class ProjectNode(conf: SQLConf, projectList: Seq[NamedExpression], child: LocalNode)
    • case class SeqScanNode(conf: SQLConf, output: Seq[Attribute], data: Seq[InternalRow])
    • case class UnionNode(conf: SQLConf, children: Seq[LocalNode]) extends LocalNode(conf)

@andrewor14
Copy link
Contributor Author

Ah, I see. So the issue is:

Thread A creates a new Thread B
Thread A starts to run a query (set the execution id property)
Thread A is running a query
Thread B sees the execution id in Thread A's properties set by step 2, then it will throw an exception.

correct!

Andrew Or added 2 commits September 11, 2015 10:47
... to make the behavior more consistent in SQL vs non-SQL cases.
@andrewor14 andrewor14 changed the title [SPARK-10548] [SQL] Fix concurrent SQL executions [SPARK-10548] [SPARK-10563] [SQL] Fix concurrent SQL executions Sep 11, 2015
@andrewor14
Copy link
Contributor Author

As of the latest commit this patch should only be merged into master. I consider the fix for SPARK-10563 a little too risky for 1.5.1, so I will open a separate patch for branch-1.5 without that fix.

@andrewor14
Copy link
Contributor Author

retest this please

@SparkQA
Copy link

SparkQA commented Sep 11, 2015

Test build #42342 has finished for PR 8710 at commit 5297f79.

  • This patch passes all tests.
  • This patch merges cleanly.
  • This patch adds the following public classes (experimental):
    • class MultilayerPerceptronClassifier(JavaEstimator, HasFeaturesCol, HasLabelCol, HasPredictionCol,
    • class MultilayerPerceptronClassificationModel(JavaModel):
    • class MinMaxScaler(JavaEstimator, HasInputCol, HasOutputCol):
    • class MinMaxScalerModel(JavaModel):
    • ("thresholds", "Thresholds in multi-class classification to adjust the probability of " +
    • class HasElasticNetParam(Params):
    • class HasFitIntercept(Params):
    • class HasStandardization(Params):
    • class HasThresholds(Params):
    • thresholds = Param(Params._dummy(), "thresholds", "Thresholds in multi-class classification to adjust the probability of predicting each class. Array must have length equal to the number of classes, with values >= 0. The class with largest value p/t is predicted, where p is the original probability of that class and t is the class' threshold.")
    • self.thresholds = Param(self, "thresholds", "Thresholds in multi-class classification to adjust the probability of predicting each class. Array must have length equal to the number of classes, with values >= 0. The class with largest value p/t is predicted, where p is the original probability of that class and t is the class' threshold.")

@SparkQA
Copy link

SparkQA commented Sep 11, 2015

Test build #42357 has finished for PR 8710 at commit 5297f79.

  • This patch fails PySpark unit tests.
  • This patch merges cleanly.
  • This patch adds no public classes.

…executions

Conflicts:
	core/src/test/scala/org/apache/spark/ThreadingSuite.scala
@SparkQA
Copy link

SparkQA commented Sep 12, 2015

Test build #42361 has finished for PR 8710 at commit 3c00cc6.

  • This patch passes all tests.
  • This patch merges cleanly.
  • This patch adds the following public classes (experimental):
    • class MultilayerPerceptronClassifier(JavaEstimator, HasFeaturesCol, HasLabelCol, HasPredictionCol,
    • class MultilayerPerceptronClassificationModel(JavaModel):
    • class MinMaxScaler(JavaEstimator, HasInputCol, HasOutputCol):
    • class MinMaxScalerModel(JavaModel):
    • ("thresholds", "Thresholds in multi-class classification to adjust the probability of " +
    • class HasElasticNetParam(Params):
    • class HasFitIntercept(Params):
    • class HasStandardization(Params):
    • class HasThresholds(Params):
    • thresholds = Param(Params._dummy(), "thresholds", "Thresholds in multi-class classification to adjust the probability of predicting each class. Array must have length equal to the number of classes, with values >= 0. The class with largest value p/t is predicted, where p is the original probability of that class and t is the class' threshold.")
    • self.thresholds = Param(self, "thresholds", "Thresholds in multi-class classification to adjust the probability of predicting each class. Array must have length equal to the number of classes, with values >= 0. The class with largest value p/t is predicted, where p is the original probability of that class and t is the class' threshold.")
    • case class IntersectNode(conf: SQLConf, left: LocalNode, right: LocalNode)
    • case class SampleNode(
    • case class TakeOrderedAndProjectNode(

/**
* Keys of local properties that should not be inherited by children threads.
*/
private[spark] val nonInheritedLocalProperties: HashSet[String] = new HashSet[String]
Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Exposing a mutable HashSet in the thread-safe SparkContext looks dangerous. Actually, I suggest not to add nonInheritedLocalProperties in the master branch. How about just cloning the parent properties without adding the nonInheritedLocalProperties logic? I understand that we still need nonInheritedLocalProperties for 1.5 branch to avoid changing the semantics.

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

The whole point of this is to avoid inheriting the SQL execution ID, which fixes SPARK-10548. How can we fix this issue with just cloning?

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

I made this private in the latest commit and added a setter method for it. Does this address your concern?

@andrewor14
Copy link
Contributor Author

@zsxwing I just noticed a potential source of confusion. If I understand correctly your view is that we should just clone the properties instead of having the nonInheritedLocalProperties. However, as I mentioned in the reply just cloning the properties won't fix SPARK-10548, because the issue is that the execution ID is passed directly to the child thread, NOT that it is mutated after the child thread is spawned. Does that make sense? Please let me know if I'm missing something.

@zsxwing
Copy link
Member

zsxwing commented Sep 13, 2015

the execution ID is passed directly to the child thread

I believe that this is not the cause of SPARK-10548. The cause of SPARK-10548 is the child thread can see the execution id that is set by the parent thread after the child thread is spawned.

If there is no execution id in the local properties when creating a child thread and we change it to clone the properties, then the child thread won't see the execution id that is set by the parent thread.

@SparkQA
Copy link

SparkQA commented Sep 13, 2015

Test build #42379 has finished for PR 8710 at commit 35bb6f0.

  • This patch passes all tests.
  • This patch merges cleanly.
  • This patch adds no public classes.

The fix for SPARK-10548 can be simplified by just cloning the
parent properties on inherit rather than excluding specific
properties from ever being inherited. This is safe because the
child thread must be created BEFORE the parent thread runs a
query.
@andrewor14
Copy link
Contributor Author

If there is no execution id in the local properties when creating a child thread and we change it to clone the properties, then the child thread won't see the execution id that is set by the parent thread.

Ah, I see. You're saying the child thread must be created before the query is run, not while it's running. That makes sense. Previously I accounted for the case where the child thread is created in the middle of the query, which I suppose is not possible. I have updated the code based on your suggestion.

@andrewor14 andrewor14 force-pushed the concurrent-sql-executions branch from b4bcc3c to fce3819 Compare September 14, 2015 21:23
@andrewor14
Copy link
Contributor Author

@zsxwing Alright I have updated it. Please have another look. I also updated the one for branch-1.5 (#8721), which has all of the changes here except the new behavior is triggered only in SQL.

…executions

Conflicts:
	core/src/test/scala/org/apache/spark/ThreadingSuite.scala
@SparkQA
Copy link

SparkQA commented Sep 14, 2015

Test build #42441 has finished for PR 8710 at commit b4bcc3c.

  • This patch fails Spark unit tests.
  • This patch merges cleanly.
  • This patch adds no public classes.

@SparkQA
Copy link

SparkQA commented Sep 14, 2015

Test build #42447 has finished for PR 8710 at commit fce3819.

  • This patch fails PySpark unit tests.
  • This patch merges cleanly.
  • This patch adds the following public classes (experimental):
    • class MultilayerPerceptronClassifier(JavaEstimator, HasFeaturesCol, HasLabelCol, HasPredictionCol,
    • class MultilayerPerceptronClassificationModel(JavaModel):
    • class MinMaxScaler(JavaEstimator, HasInputCol, HasOutputCol):
    • class MinMaxScalerModel(JavaModel):
    • ("thresholds", "Thresholds in multi-class classification to adjust the probability of " +
    • class HasElasticNetParam(Params):
    • class HasFitIntercept(Params):
    • class HasStandardization(Params):
    • class HasThresholds(Params):
    • thresholds = Param(Params._dummy(), "thresholds", "Thresholds in multi-class classification to adjust the probability of predicting each class. Array must have length equal to the number of classes, with values >= 0. The class with largest value p/t is predicted, where p is the original probability of that class and t is the class' threshold.")
    • self.thresholds = Param(self, "thresholds", "Thresholds in multi-class classification to adjust the probability of predicting each class. Array must have length equal to the number of classes, with values >= 0. The class with largest value p/t is predicted, where p is the original probability of that class and t is the class' threshold.")
    • case class Stddev(child: Expression) extends StddevAgg(child)
    • case class StddevPop(child: Expression) extends StddevAgg(child)
    • case class StddevSamp(child: Expression) extends StddevAgg(child)
    • abstract class StddevAgg(child: Expression) extends AlgebraicAggregate
    • abstract class StddevAgg1(child: Expression) extends UnaryExpression with PartialAggregate1
    • case class Stddev(child: Expression) extends StddevAgg1(child)
    • case class StddevPop(child: Expression) extends StddevAgg1(child)
    • case class StddevSamp(child: Expression) extends StddevAgg1(child)
    • case class ComputePartialStd(child: Expression) extends UnaryExpression with AggregateExpression1
    • case class ComputePartialStdFunction (
    • case class MergePartialStd(
    • case class MergePartialStdFunction(
    • case class StddevFunction(
    • case class IntersectNode(conf: SQLConf, left: LocalNode, right: LocalNode)
    • case class SampleNode(
    • case class TakeOrderedAndProjectNode(

@SparkQA
Copy link

SparkQA commented Sep 15, 2015

Test build #42452 has finished for PR 8710 at commit 75a8d90.

  • This patch fails Spark unit tests.
  • This patch merges cleanly.
  • This patch adds no public classes.

@SparkQA
Copy link

SparkQA commented Sep 15, 2015

Test build #1753 has finished for PR 8710 at commit 75a8d90.

  • This patch passes all tests.
  • This patch merges cleanly.
  • This patch adds no public classes.

@SparkQA
Copy link

SparkQA commented Sep 15, 2015

Test build #1751 has finished for PR 8710 at commit 75a8d90.

  • This patch passes all tests.
  • This patch merges cleanly.
  • This patch adds no public classes.

@SparkQA
Copy link

SparkQA commented Sep 15, 2015

Test build #1752 has finished for PR 8710 at commit 75a8d90.

  • This patch fails Spark unit tests.
  • This patch merges cleanly.
  • This patch adds no public classes.

@zsxwing
Copy link
Member

zsxwing commented Sep 15, 2015

LGTM

@andrewor14
Copy link
Contributor Author

Thanks, I'm merging this into master.

asfgit pushed a commit that referenced this pull request Sep 15, 2015
…nch-1.5

*Note: this is for branch-1.5 only*

This is the same as #8710 but affects only SQL. The more general fix for SPARK-10563 is considered  risky to backport into a maintenance release, so it is disabled by default and enabled only in SQL.

Author: Andrew Or <[email protected]>

Closes #8721 from andrewor14/concurrent-sql-executions-1.5 and squashes the following commits:

3b9b462 [Andrew Or] Merge branch 'branch-1.5' of github.com:apache/spark into concurrent-sql-executions-1.5
4435db7 [Andrew Or] Clone properties only for SQL for backward compatibility
0b7e5ab [Andrew Or] Clone parent local properties on inherit
@asfgit asfgit closed this in b6e9986 Sep 15, 2015
@andrewor14 andrewor14 deleted the concurrent-sql-executions branch September 15, 2015 23:48
@nicerobot
Copy link

We are still experiencing this. See SPARK-10548.

I've verified that we are indeed using a version of Spark with SPARK-10548 implementation yet the issue is still reproducible. In fact, if in the test case, you:

println(null != sc.getLocalProperties("spark.sql.execution.id"))
df.count()

you can anticipate when a thread will throw the exception.

@d-ee
Copy link

d-ee commented Apr 15, 2016

This still seems to be around.
We're using Spark 1.5.2.

java.lang.IllegalArgumentException: "spark.sql.execution.id is already set"

Trace:
org.apache.spark.sql.execution.SQLExecution$.withNewExecutionId(SQLExecution.scala:87)
org.apache.spark.sql.DataFrame.withNewExecutionId(DataFrame.scala:1903)
org.apache.spark.sql.DataFrame.collect(DataFrame.scala:1384)
org.apache.spark.sql.DataFrame.head(DataFrame.scala:1314)
org.apache.spark.sql.DataFrame.head(DataFrame.scala:1321)
org.apache.spark.sql.DataFrame.first(DataFrame.scala:1328)

@zsxwing
Copy link
Member

zsxwing commented Apr 15, 2016

@d-ee do you have a reproducer? Let's move the discussion to JIRA instead of here.

@ljwagerfield
Copy link

We're seeing this exception too. We're also running our operations in serial (at least on the surface it seems as if we are). If we execute a df.save operation in a Future and wait for that Future to complete, then all df.save operations we perform within subsequent Futures will fail.

This specifically happens when we load Avro files from S3 and save them as Parquet back to S3. The loading works fine but the saving fails on 2nd attempt. Furthermore, if we simply generate a DataFrame from an in-memory list (so we're not loading from S3 - only saving to S3) then the error goes away... I'm not sure how helpful this is.

We're using Java 1.8, Scala 2.10.5, with our Spark codebase at commit 15de51c.

Our exact reproduction steps are:

1. Run a Spark Shell with appropriate dependencies

./spark-shell --packages com.amazonaws:aws-java-sdk:1.10.75,org.apache.hadoop:hadoop-aws:2.7.2,com.databricks:spark-avro_2.10:2.0.1

2. Run the following setup code within the shell

import scala.concurrent.{ExecutionContext, Future}
import scala.concurrent.ExecutionContext.Implicits.global
import sqlContext.implicits._
import org.apache.spark.sql._
implicit val sqlContext = new org.apache.spark.sql.SQLContext(sc)

val hadoopConf = sc.hadoopConfiguration;
hadoopConf.set("fs.s3.impl", "org.apache.hadoop.fs.s3native.NativeS3FileSystem")
hadoopConf.set("fs.s3.awsAccessKeyId", "...")
hadoopConf.set("fs.s3.awsSecretAccessKey", "...")

val df = sqlContext.read.format("com.databricks.spark.avro").load("s3://bucket/input.avro")

def doWrite() {
    df.write.format("org.apache.spark.sql.parquet").mode(SaveMode.Overwrite).save("s3://bucket/output")
}

3. Run this twice - but leaving time for the first execution to finish (so the operations are serialised)

Future { doWrite(); println("SUCCEEDED") }.recover { case e: Throwable => println("FAILED: " + e.getMessage()); e.printStackTrace() }

Result:

spark.sql.execution.id is already set
java.lang.IllegalArgumentException: spark.sql.execution.id is already set
    at org.apache.spark.sql.execution.SQLExecution$.withNewExecutionId(SQLExecution.scala:87)
    at org.apache.spark.sql.execution.datasources.InsertIntoHadoopFsRelation.run(InsertIntoHadoopFsRelation.scala:108)
    at org.apache.spark.sql.execution.ExecutedCommand.sideEffectResult$lzycompute(commands.scala:58)
    at org.apache.spark.sql.execution.ExecutedCommand.sideEffectResult(commands.scala:56)
    at org.apache.spark.sql.execution.ExecutedCommand.doExecute(commands.scala:70)
    at org.apache.spark.sql.execution.SparkPlan$$anonfun$execute$5.apply(SparkPlan.scala:132)
    at org.apache.spark.sql.execution.SparkPlan$$anonfun$execute$5.apply(SparkPlan.scala:130)
    at org.apache.spark.rdd.RDDOperationScope$.withScope(RDDOperationScope.scala:150)
    at org.apache.spark.sql.execution.SparkPlan.execute(SparkPlan.scala:130)
    at org.apache.spark.sql.execution.QueryExecution.toRdd$lzycompute(QueryExecution.scala:55)
    at org.apache.spark.sql.execution.QueryExecution.toRdd(QueryExecution.scala:55)
    at org.apache.spark.sql.execution.datasources.ResolvedDataSource$.apply(ResolvedDataSource.scala:256)
    at org.apache.spark.sql.DataFrameWriter.save(DataFrameWriter.scala:148)
    at org.apache.spark.sql.DataFrameWriter.save(DataFrameWriter.scala:139)
    at $line38.$read$$iwC$$iwC$$iwC$$iwC$$iwC$$iwC$$iwC$$iwC$$iwC$$iwC$$iwC$$iwC.doWrite(<console>:41)
    at $line40.$read$$iwC$$iwC$$iwC$$iwC$$iwC$$iwC$$iwC$$iwC$$iwC$$iwC$$iwC$$iwC$$anonfun$2.apply$mcV$sp(<console>:43)
    at $line40.$read$$iwC$$iwC$$iwC$$iwC$$iwC$$iwC$$iwC$$iwC$$iwC$$iwC$$iwC$$iwC$$anonfun$2.apply(<console>:43)
    at $line40.$read$$iwC$$iwC$$iwC$$iwC$$iwC$$iwC$$iwC$$iwC$$iwC$$iwC$$iwC$$iwC$$anonfun$2.apply(<console>:43)
    at scala.concurrent.impl.Future$PromiseCompletingRunnable.liftedTree1$1(Future.scala:24)
    at scala.concurrent.impl.Future$PromiseCompletingRunnable.run(Future.scala:24)
    at scala.concurrent.impl.ExecutionContextImpl$$anon$3.exec(ExecutionContextImpl.scala:107)
    at scala.concurrent.forkjoin.ForkJoinTask.doExec(ForkJoinTask.java:260)
    at scala.concurrent.forkjoin.ForkJoinPool$WorkQueue.runTask(ForkJoinPool.java:1339)
    at scala.concurrent.forkjoin.ForkJoinPool.runWorker(ForkJoinPool.java:1979)
    at scala.concurrent.forkjoin.ForkJoinWorkerThread.run(ForkJoinWorkerThread.java:107)

@zsxwing
Copy link
Member

zsxwing commented May 19, 2016

@ljwagerfield it should be fixed in #11586

@andrewor14
Copy link
Contributor Author

Yes, unfortunately that is only available in the upcoming 2.0 so you will have to upgrade to fix the problem.

ashangit pushed a commit to ashangit/spark that referenced this pull request Oct 19, 2016
…nch-1.5

*Note: this is for branch-1.5 only*

This is the same as apache#8710 but affects only SQL. The more general fix for SPARK-10563 is considered  risky to backport into a maintenance release, so it is disabled by default and enabled only in SQL.

Author: Andrew Or <[email protected]>

Closes apache#8721 from andrewor14/concurrent-sql-executions-1.5 and squashes the following commits:

3b9b462 [Andrew Or] Merge branch 'branch-1.5' of github.com:apache/spark into concurrent-sql-executions-1.5
4435db7 [Andrew Or] Clone properties only for SQL for backward compatibility
0b7e5ab [Andrew Or] Clone parent local properties on inherit

(cherry picked from commit 997be78)
Sign up for free to join this conversation on GitHub. Already have an account? Sign in to comment
Labels
None yet
Projects
None yet
Development

Successfully merging this pull request may close these issues.

8 participants